package org.voovan.korla;

import org.voovan.Global;
import org.voovan.korla.message.Msg;
import org.voovan.korla.message.MsgRegister;
import org.voovan.network.IoSession;
import org.voovan.network.SocketContext;
import org.voovan.tools.TPerformance;
import org.voovan.tools.TProperties;
import org.voovan.tools.UniqueId;
import org.voovan.tools.collection.CacheMap;
import org.voovan.tools.log.Logger;
import org.voovan.tools.serialize.TSerialize;

import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;

/**
 * Korla 静态持久类
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class KorlaStatic {

    public final static String DEFAULT_CHANNEL = "default";
    public final static String CHANNEL_STR = "CHANNEL";
    public final static String CONNECT_STR = "CONNECT";
    public final static String PING = "ping";
    public final static String PONG = "pong";

    public static Integer UNIQUE_SEED             = TProperties.getInt("korla","korla.seed", (int)(Math.random()*1023));
    public static Integer CLEAN_INTERVAL          = TProperties.getInt("korla","korla.cleanInterval", 30) * 1000;
    public static Integer CLEAN_BEFORE            = TProperties.getInt("korla","korla.cleanBefore", 60) * 1000;
    public static Integer MESSAGE_CACHE_SIZE      = TProperties.getInt("korla","korla.messageCacheSize", 65536);
    public static Integer MESSAGE_EXPIRE          = TProperties.getInt("korla","korla.messageExpire", 30);
    public static Integer IDEMPOTENCE_CACHE_SIZE  = TProperties.getInt("korla","korla.idempotenceCacheSize", 65536);
    public static Integer IDEMPOTENCE_EXPIRE      = TProperties.getInt("korla","korla.idempotenceExpire", 1) * 60;      //minute
    public static Integer CONNECT_FAILED_WAIT     = TProperties.getInt("korla","korla.connectFailedWait", 5);
    public static Boolean DEFAULT_IDEMPOTENCE     = TProperties.getBoolean("korla","korla.defaultIdempotence", false);
    public static Integer PROVIDER_RUNNER_COUNT   = TProperties.getInt("korla","korla.providerRunnerCount", TPerformance.getProcessorCount()*3);

    public static Integer BATCH_SIZE              = TProperties.getInt("korla","korla.batch.size", -1);
    public static Integer BATCH_INTERVALE         = TProperties.getInt("korla","korla.batch.interval", -1);

    public static UniqueId UNIQUE_ID = new UniqueId(UNIQUE_SEED);

    static {
        Logger.simplef("[KORLA] SEED: {}", UNIQUE_SEED);

        if(BATCH_INTERVALE >0) {
            Logger.simplef("[KORLA] BATCH_SIZE: {}", BATCH_SIZE);
            Logger.simplef("[KORLA] BATCH_INTERVALE: {}ms", BATCH_INTERVALE);
        }
    }

    //============================ 持久化 ============================
    public final static Map<Long, Msg>          MESSAGE;
    public final static Map<Long, Msg>          MESSAGE_IDEMPOTENCE;

    static {
        MESSAGE = new CacheMap<Long, Msg>()
                .interval(CLEAN_INTERVAL/1000)
                .maxSize(MESSAGE_CACHE_SIZE).autoRemove(true)
                .expire((CLEAN_BEFORE/1000L));

        MESSAGE_IDEMPOTENCE = new CacheMap<Long, Msg>()
                .interval(CLEAN_INTERVAL/1000)
                .maxSize(IDEMPOTENCE_CACHE_SIZE).autoRemove(true)
                .expire(IDEMPOTENCE_EXPIRE.longValue());
    }

    //============================ 内存 ============================
    public final static ConcurrentLinkedDeque<Msg> TIMEOUT_MSG = new ConcurrentLinkedDeque<Msg>();


    //只缓存同一个消息 ID 的最后一个Msg对象 REQ->RESP->ACK
    public final static CacheMap<Long, Msg> MESSAGE_CACHE    = new CacheMap<Long, Msg>()
            .maxSize(MESSAGE_CACHE_SIZE).autoRemove(true)
            .expire(MESSAGE_EXPIRE.longValue())
            .destory((key, msg) -> {
                if(msg!=null && !msg.isDone() && msg.isExpire()) {
                    if(!TIMEOUT_MSG.contains(msg)) {
                        msg.updateTimestamp();
                        TIMEOUT_MSG.offer(msg);
                    }
                    return null;
                } else {
                    return -1L;
                }
            }).create();

    /**
     * 处理超时消息
     * @param consumer 超时消息处理器
     */
    public static void onTimeoutMessage(Consumer<Msg> consumer) {
        Global.getThreadPool().execute(()->{
            Msg timeoutMsg = TIMEOUT_MSG.poll();
            if(timeoutMsg!=null) {
                consumer.accept(timeoutMsg);
            }
        });
    }

    //序列化类型注册
    public static void registerClass() {
        TSerialize.register(MsgRegister.class);
    }

    /**
     * 从会话获取当前通道名称
     * @param session 会话
     * @return 当前通道名称
     */
    public static String getChannel(IoSession session) {
        return (String)session.getAttribute(CHANNEL_STR);
    }
}
